//-*-C++-*-
/***************************************************************************
 *
 *   Copyright (C) 2011 by Willem van Straten
 *   Licensed under the Academic Free License version 2.1
 *
 ***************************************************************************/

// dspsr/Signal/General/dsp/SingleThread.h

#ifndef __dspsr_SingleThread_h
#define __dspsr_SingleThread_h

#include "dsp/Pipeline.h"

#include "dsp/TwoBitCorrectionConfig.h"
#include "CommandLine.h"
#include "Functor.h"
#include "TextEditor.h"
#include "MJD.h"

class ThreadContext;

namespace dsp {

  class Source;
  class TimeSeries;
  class Operation;
  class Observation;
  class Scratch;
  class Memory;

  //! A single Pipeline thread
  class SingleThread : public Pipeline
  {

  public:

    //! The MultiThread class may access private attributes
    friend class MultiThread;

    //! Stores configuration information shared between threads
    class Config;

    //! Constructor
    SingleThread ();

    //! Destructor
    ~SingleThread ();

    //! Set the configuration
    void set_configuration (Config*);

    //! Get the configuration
    Config* get_configuration();

    //! Set the Source from which data are obtained
    void set_source (Source*);

    //! Get the Source from which data are obtained
    Source* get_source ();

    //! Return true if this thread will run on the GPU
    bool run_on_gpu() const;

    //! Initialize resources required by signal procesing pipline
    void initialize ();

    //! Build the signal processing pipeline
    void construct ();

    //! Append an Operation to the end of the pipeline
    void append (Operation*);

    //! Prepare the signal processing pipeline
    void prepare ();

    //! Seek such that output starts at the specified epoch
    void seek_epoch (const MJD&);

    //! Run through the data
    void run ();

    //! Share any necessary resources with the specified thread
    virtual void share (SingleThread*);

    //! Combine the results from another processing thread
    virtual void combine (const SingleThread*);

    //! Finish everything
    void finish ();

    //! Get the minimum number of samples required to process
    uint64_t get_minimum_samples () const;

    //! The verbose output stream shared by all operations
    std::ostream cerr;

    //! Take and manage a new ostream instance
    void take_ostream (std::ostream* newlog);

    unsigned thread_id;
    void set_affinity (int core);

    //! get the operations being performed
    std::vector< Reference::To<Operation> > const get_operations () { return operations; };

    //! Set the number of time samples per block, based on RAM constraints
    void set_block_size (uint64_t minimum_samples = 0, uint64_t input_overlap = 0);

    //! Create a new TimeSeries instance
    TimeSeries* new_time_series ();
    TimeSeries* new_TimeSeries () { return new_time_series(); }
  
    //! Return the memory manager
    Memory* get_memory();

    //! Get pointer to the GPU stream
    void* get_gpu_stream();

  protected:

    //! Any special operations that must be performed at the end of data
    virtual void end_of_data ();

    //! Configure the source, if possible
    virtual void prepare (Source*);

    //! Configure overlap memory, if possible
    void configure_overlap (Source*, Memory*);

    //! Pointer to the ostream
    std::ostream* log;

    //! Processing thread states
    enum State
    {
      Fail,        //! an error has occurred
      Idle,        //! nothing happening
      Construct,   //! request to construct
      Constructed, //! construction completed
      Prepare,     //! request to prepare
      Prepared,    //! preparations completed
      Run,         //! processing started
      Done,        //! processing completed
      Joined       //! completion acknowledged
    };

    //! Processing state
    State state;

    //! Error status
    Error error;

    //! State change communication
    ThreadContext* state_change;

    //! Mutex protecting source
    ThreadContext* source_context;

    //! Processing thread with whom sharing will occur
    SingleThread* colleague;

    //! Manages loading and unpacking
    Reference::To<Source> source;

    //! The TimeSeries into which the Source outputs data
    Reference::To<TimeSeries> source_output;

    //! Configuration information
    Reference::To<Config> config;

    //! The operations to be performed
    std::vector< Reference::To<Operation> > operations;

    //! Insert a dump point before the named operation
    void insert_dump_point (const std::string& transformation_name);

    //! Insert a mask before the named operation
    void insert_mask (const std::string& transformation_name);

    //! The scratch space shared by all operations
    Reference::To<Scratch> scratch;

    //! The minimum number of samples required to process
    uint64_t minimum_samples;

    Reference::To<Memory> device_memory;
    void* gpu_stream;
    int gpu_device;

    //! The name of the application running this thread
    const std::string& app();

    //! Number of samples in each block
    uint64_t block_size = 0;

    //! Number of bytes of data memory required to process each block
    uint64_t block_size_bytes = 0;

  };

  class Input;

  //! Per-thread configuration options
  class SingleThread::Config : public Reference::Able
  {
  public:

    //! Default constructor
    Config () = default;

    //! Add command line options
    virtual void add_options (CommandLine::Menu&);

    //! Create new Input based on command line options
    Source* open (int argc, char** argv);

    // set block size to this factor times the minimum possible
    void set_times_minimum_ndat (unsigned);
    unsigned get_times_minimum_ndat () const { return times_minimum_ndat; }

    // set block_size to result in approximately this much RAM usage
    void set_maximum_RAM (uint64_t);
    uint64_t get_maximum_RAM () const { return maximum_RAM; }

    // set block_size to result in at least this much RAM usage
    void set_minimum_RAM (uint64_t);
    uint64_t get_minimum_RAM () const { return minimum_RAM; }

    // set the minimum RAM size in MB
    void set_minimum_RAM_MB(const std::string&);

    // set the maximum RAM size, either in MB or as a multiple of the minimum
    void set_maximum_RAM_MB(const std::string&);

    //! Prepare the input according to the configuration
    virtual void prepare (Source*);

    //! Parse two-bit correction configuration options
    void twobit_parse (const std::string& text);

    //! Two-bit correction configuration options 
    TwoBitCorrection::Config twobit_config;

    //! The name of the application
    std::string application_name;

    //! external function used to prepare the input each time it is opened
    Functor< void(Source*) > source_prepare;

    // when unpacking FITS data, denormalize using DAT_SCL and DAT_OFFS
    bool apply_FITS_scale_and_offset = false;

    //! run repeatedly on the same input
    bool run_repeatedly = false;

    // Input files represent a single continuous observation
    bool force_contiguity = false;

    //! use input-buffering to compensate for operation edge effects
    bool input_buffering = true;

    //! use weighted time series to flag bad data
    bool weighted_time_series = true;

    // optimize the order in which data are stored (e.g. FPT vs TFP)
    bool optimal_order = true;

    //! Seek such that the first time sample output is at the specified (topocentric) MJD
    /*! Samples lost to dispersive delay are accounted. */
    MJD seek_epoch;

    // number of seconds to seek into data
    double seek_seconds = 0.0;

    // number of seconds to process from data
    double total_seconds = 0.0;

    // Command line values are header params, not file names
    bool command_line_header = false;

    //! List all editor-accessible attributes of the observation
    bool list_attributes = false;

    //! The editor used to set Observation attributes via the command line
    TextEditor<Observation> editor;

    //! report vital statistics
    bool report_vitals = true;

    //! report the percentage finished
    bool report_done = true;

    //! set the cuda devices to be used
    void set_cuda_device (std::string);
    unsigned get_cuda_ndevice () const { return cuda_device.size(); }

    //! set the number of CPU threads to be used
    void set_nthread (unsigned);

    //! get the total number of threads
    unsigned get_total_nthread () const;

    //! set the cpus on which each thread will run
    void set_affinity (std::string);

    //! set the FFT library
    void set_fft_library (std::string);

    //! dump points
    std::vector<std::string> dump_before;

    //! mask points
    std::vector<std::string> mask_before;

    //! get the number of buffers required to process the data
    unsigned get_nbuffers () const { return buffers; }

    //! Operate in quiet mode
    virtual void set_quiet ();

    //! Operate in verbose mode
    virtual void set_verbose ();

    //! Operate in very verbose mode
    virtual void set_very_verbose ();

    void list_backends();

  protected:

    //! These attributes are set only by the SingleThread class
    friend class SingleThread;

    //! application can make use of CUDA
    bool can_cuda = false;

    //! CUDA devices on which computations will take place
    std::vector<unsigned> cuda_device;

    //! application can make use of multiple cores
    bool can_thread = false;

    //! CPUs on which threads will run
    std::vector<unsigned> affinity;

    //! number of CPU threads
    unsigned nthread = 0;

    //! number of buffers that have been created by new_time_series
    unsigned buffers = 0;

    //! number of times that the input has been re-opened
    unsigned repeated = 0;

    // set block size to this factor times the minimum possible
    unsigned times_minimum_ndat = 1;

    // set block size to result in approximately this much RAM usage
    uint64_t maximum_RAM = 256 * 1024 * 1024;

    // set block size to result in at least this much RAM usage
    uint64_t minimum_RAM = 0;
  };

}

#endif // !defined(__SingleThread_h)
